#!/usr/bin/python
import sys
import os
import traceback
import optparse
import logging

from airflow.logging_config import configure_logging
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.configuration import conf, AIRFLOW_HOME
from airflow_dag_parser.dag_parser import DagParser
from airflow_dag_parser.converter.dag_converter import DagConverter
from airflow_dag_parser.models.dw_base import ExtendJSONEncoder
try:
    import json
except Exception as e:
    import simplejson as json

try:
    reload(sys)
except Exception as e:
    from imp import reload

logger = logging.getLogger(__name__)
try:
    sys.setdefaultencoding('utf-8')
except Exception as e:
    logger.warning(e)


class Main(LoggingMixin):
    def __init__(self, dag_folder):
        self.dag_folder = os.path.abspath(dag_folder)
        self.dag_parser = None
        self.reports = []

    def run(self):
        reload(sys)
        sys.path.insert(0, self.dag_folder)

        self.log.info("conf: " + AIRFLOW_HOME)
        self.dag_parser = DagParser(self.dag_folder)
        self.dag_parser.parse()

        dags = self.dag_parser.get_dags()

        for dag_id in dags:
            dag = dags[dag_id]
            report = {
                "dag_id": dag_id,
                "file": dag.fileloc,
                "workflow": None,
                "message": "",
                "exception": "",
                "success": False
            }
            try:
                dag_converter = DagConverter(dag)
                workflow = dag_converter.convert()
                report['workflow'] = workflow
                report['success'] = True
            except Exception as e:
                self.log.error(traceback.format_exc())
                report['success'] = False
                report['message'] = str(e)
                report['exception'] = traceback.format_exc()
            self.reports.append(report)
        pass


if __name__ == "__main__":
    configure_logging()
    parser = optparse.OptionParser(
        "airflow-exporter", description="airflow exporter", version="1.0")
    parser.add_option("-d", "--dag_folder", action="store",
                      dest="dag_folder", help="airflow dag folder")
    parser.add_option("-o", "--output", action="store",
                      dest="output", help="airflow export result file")
    parser.add_option("-c", "--connections", action="store",
                      dest="connections", help="connection csv file")

    opts, args = parser.parse_args(sys.argv)
    if not opts.dag_folder:
        logger.error("dag_folder not set")
        sys.exit(-1)

    dag_folder = opts.dag_folder

    if not opts.output:
        logger.error("output file not set")
        sys.exit(-1)

    output = opts.output

    if opts.connections:
        os.environ['connections_csv'] = connections

    main = Main(dag_folder)
    main.run()

    js = json.dumps(main.reports, indent=4, cls=ExtendJSONEncoder)
    with open(output, 'w') as fd:
        fd.write(js)
        fd.flush()
